package cn.dfun.sample.flink.apitest

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

/**
  * es sink
  * 相关脚本:
  * curl "node-01:9200/_cat/inidices?v"
  * curl "node-01:9200/sensor/_search?pretty"
  */
object EsSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val inputPath = "C:\\wor\\flink-sample\\src\\main\\resources\\sensor"
    val inputStream= env.readTextFile(inputPath)
    // 包装成样例类
    val dataStream = inputStream
      .map(data => {
        var arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("node-01", 9200))
    val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
      override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        // 包装一个Map作为data source
        val dataSource = new util.HashMap[String, String]()
        dataSource.put("id", t.id)
        dataSource.put("temperature", t.temperature.toString)
        dataSource.put("ts", t.timestamp.toString)

        val indexRequest = Requests.indexRequest()
          .index("sensor")
          .`type`("readingdata")
          .source(dataSource)
        requestIndexer.add(indexRequest)
      }
    }
    dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](
      httpHosts, myEsSinkFunc
    ).build())

    env.execute("Es sink test")
  }
}
